iT邦幫忙

2021 iThome 鐵人賽

DAY 23
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 23

卡夫卡的藏書閣【Book23】- Kafka - KafkaJS 監控狀態事件

  • 分享至 

  • xImage
  •  

“I miss you deeply, unfathomably, senselessly, terribly.”
― Franz Kafka, Letters to Milena


監控狀態事件 ( Instrumentation Events )


部分的操作狀態會被 EventEmitter 監聽,可以用以下這些方法去取得事件的狀態 consumer.on(), producer.on()admin.on(),範例如下

const { HEARTBEAT } = consumer.events
const removeListener = consumer.on(HEARTBEAT, e => console.log(`heartbeat at ${e.timestamp}`))

這些監聽者都是非同步的,消費者不會因為你啟動了這些監聽者而導致阻塞,監聽者若是發生錯誤並不會影響到消費者

以下為一個監聽事件

{
  id: <Number>,
  type: <String>,
  timestamp: <Number>,
  payload: <Object>
}

所有可以監聽的事件清單


消費者
  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 發生在每次向代理發起網路請求時
  • CONNECT
    • payload: 無
    • 消費者連線到代理
  • GROUP_JOIN
    • payload: {groupId, memberId, leaderId, isLeader, memberAssignment, groupProtocol, duration}
    • 消費者加入消費者群組
  • FETCH_START
    • payload: {}
    • 開始從(單數/複數)代理取得訊息
  • FETCH
    • payload: {numberOfBatches, duration}
    • 結束從(單數/複數)代理取得訊息
  • START_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset}
    • 開始使用批次取得訊息
  • END_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset, duration}
    • payload: 無
    • 結束使用批次取得訊息,包含使用方法 eachMessage/eachBatch 時
  • COMMIT_OFFSETS
    • payload: {groupId, memberId, groupGenerationId, topics}
    • 提交偏移量
  • STOP
    • payload: 無
    • 消費者停止運作
  • DISCONNECT
    • payload: 無
    • 消費者中斷連線
  • CRASH
    • payload: {error, groupId, restart}
    • 消費者錯誤,在消費者錯誤的情況下,消費者會想要重啟自己,如果這個錯誤不是重啟可以解決的,消費者會停止並中斷連線,你的應用程式應該要監聽這個事件去做出判斷
  • HEARTBEAT
    • payload: {groupId, memberId, groupGenerationId}
    • 傳送給協調器的心跳
  • REBALANCING
    • payload: {groupId, memberId}
    • 消費者群組開始重新分配
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 發送給代理的請求超時
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同時並存的請求數量是依靠參數 maxInflightRequests 控制,如果數量有變化,這個事件會被觸發
  • RECEIVED_UNSUBSCRIBED_TOPICS
    • payload: {groupId, generationId, memberId, assignedTopics, topicsSubscribed, topicsNotSubscribed}
    • 如果你的消費者群組部分的消費者訂閱的主題跟其餘的消費者訂閱的主題有差異的話,此事件會被觸發

生產者


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 發生在每次向代理發起網路請求時
  • CONNECT
    • payload: 無
    • 生產者連線到代理
  • DISCONNECT
    • payload: 無
    • 生產者中斷連線
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理發起的請求超時
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同時並存的請求數量是依靠參數 maxInflightRequests 控制,如果數量有變化,這個事件會被觸發

管理者 ( admin )


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 發生在每次向代理發起網路請求時
  • CONNECT
    • payload: 無
    • 管理者向代理發起連線
  • DISCONNECT
    • payload: 無
    • 管理者中斷連線
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理發起的請求超時
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同時並存的請求數量是依靠參數 maxInflightRequests 控制,如果數量有變化,這個事件會被觸發

上一篇
卡夫卡的藏書閣【Book22】- Kafka - KafkaJS 消費者 4
下一篇
卡夫卡的藏書閣【Book24】- Kafka - KafkaJS Admin 1
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言